Overview
基于Java1.6, 前后的版本实现都有所不同.
确定入口
在通常情况下, 我们是配合线程池获取一个Future
的:1
2
3
4Callable<Integer> callable = () -> {
return 1;
};
Future<Integer> future = Executors.newSingleThreadExecutor().submit(callable);
观察submit
方法内部, 不考虑”池”的概念实际上等同于下面操作:
1 | RunnableFuture<Integer> runnableFuture = new FutureTask<>(callable); |
RunnableFuture
本身实现了Runnable
和Future
. 从而可以确定FutureTask
的两个重要入口方法: run
, get
.
Sync
观察FutureTask
内部, 同样有一个Sync
. 和CountDownLatch
相似, FutureTask
本身也是个共享的同步工具. 由于一个线程被unpark
后, 会继续传播PROPAGATE
, 所以当FutureTask
中的任务执行完成后, 所有调用get
方法的线程都会退出阻塞. CountDownLatch
中state
表示数量, 而在FutureTask
则表示生命周期的状态:
- READY = 0 刚刚
new FutureTask
后的状态. - RUNNING = 1
- RAN = 2 表示执行完毕
- CANCELED = 4
如果强行类比的话, 那么get
就对应CountDownLatch
的await
. run
/cancel
则对应countDown
.
get
调用关系如下:
FutureTask#get
-> Sync#innerGet
-> AQS#acquireSharedInterruptibly(0)
根据我们之前的了解, 如果没有tryAcquireShared(0)
成功, 则当前线程会被挂起. 在此处, tryAcquireShared
的参数是没有任何意义的, 所以形参被取名叫ignore
.
1 | protected int tryAcquireShared(int ignore) { |
innerIsDone
大致为判断了状态是否为RAN
或者CANCELED
, 也就是说, 当其他线程将state
设置为这两种状态之后, 等待线程才能结束运行. 当等待线程被唤醒后, 会做两个判断, 代码非常简单:
1 | if (getState() == CANCELLED) |
run
FutureTask#run
中仅为调用 Sync#innerRun
. 其中执行了callable#call
, 没有异常则set
, 反之则setException
. 分别对应Sync#innerSet
, Sync#innerSetException
.
观察两个方法, 其中的重要内容就是把state
CAS为RAN
, 并doReleaseShared(0)
. 在CountDownLatch
中有提到过这个方法, 子类的重点在于tryReleaseShared
是如何实现的, 一定是返回true
的, 且内容非常简单粗暴:
1 | protected boolean tryReleaseShared(int ignore) { |
可以看到, 这个方法的参数同样是无意义的. 其中的runner
指的是执行FutureTask
的线程, 即调用set
,setException
的线程. 实际上刚刚的innerIsDone
除了判断状态之外, 还要求runner == null
, 对应地, 他在tryReleaseShared
这里被置为null
.
cancel
大体和set
, setException
一致, 都是将state
CAS为一个值, 此处为CANCELED
. 并且执行doReleaseShared
. 除次之外, 如果为cancel(true)
还会根据参数对执行任务的线程进行interrupt
. 但是我们知道interrupt
不一定成功, 观察下面的栗子:
1 | public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { |
可以发现, cancel
之后, get
线程马上就得到响应, 抛出异常了. 而执行线程仍然要在等待几秒后执行完.